package drds.data_propagate.server;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MigrateMap;
import drds.data_propagate.common.AbstractLifeCycle;
import drds.data_propagate.entry.ClientId;
import drds.data_propagate.entry.Entry;
import drds.data_propagate.entry.Message;
import drds.data_propagate.entry.position.BinLogEventPosition;
import drds.data_propagate.entry.position.Position;
import drds.data_propagate.entry.position.PositionRange;
import drds.data_propagate.instance.Instance;
import drds.data_propagate.instance.InstanceGenerator;
import drds.data_propagate.store.Event;
import drds.data_propagate.store.EventStore;
import drds.data_propagate.store.Events;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


@Slf4j
public class ServerImpl extends AbstractLifeCycle implements drds.data_propagate.server.Server, Service {

    private Map<String, Instance> destinationToInstanceMap;

    private InstanceGenerator instanceGenerator;


    public ServerImpl() {
        // 希望也保留用户new单独实例的需求,兼容历史
    }

    public static ServerImpl instance() {
        return SingletonHolder.SERVER;
    }

    public void start() {
        if (!isStart()) {
            super.start();

            destinationToInstanceMap = MigrateMap.makeComputingMap(new Function<String, Instance>() {

                public Instance apply(String destination) {
                    return instanceGenerator.generate(destination);
                }
            });


        }
    }

    public void stop() {
        super.stop();
        for (Map.Entry<String, Instance> entry : destinationToInstanceMap.entrySet()) {
            try {
                Instance instance = entry.getValue();
                if (instance.isStart()) {
                    try {
                        String destination = entry.getKey();
                        MDC.put("destination", destination);
                        entry.getValue().stop();
                        log.info("stop CanalInstances[{}] successfully", destination);
                    } finally {
                        MDC.remove("destination");
                    }
                }
            } catch (Exception e) {
                log.error(String.format("stop Instance[%s] has an error", entry.getKey()), e);
            }
        }

    }

    public void start(final String destination) {
        final Instance instance = destinationToInstanceMap.get(destination);
        if (!instance.isStart()) {
            try {
                MDC.put("destination", destination);

                instance.start();
                log.info("start CanalInstances[{}] successfully", destination);
            } finally {
                MDC.remove("destination");
            }
        }
    }

    public void stop(String destination) {
        Instance instance = destinationToInstanceMap.remove(destination);
        if (instance != null) {
            if (instance.isStart()) {
                try {
                    MDC.put("destination", destination);
                    instance.stop();

                    log.info("stop CanalInstances[{}] successfully", destination);
                } finally {
                    MDC.remove("destination");
                }
            }
        }
    }

    public boolean isStart(String destination) {
        return destinationToInstanceMap.containsKey(destination) && destinationToInstanceMap.get(destination).isStart();
    }

    /**
     * 客户端订阅，重复订阅时会更新对应的filter信息
     */

    public void subscribe(ClientId clientId) throws Exception {
        checkStart(clientId.getDestination());

        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        if (!instance.getMetaDataManager().isStart()) {
            instance.getMetaDataManager().start();
        }

        instance.getMetaDataManager().subscribe(clientId); // 执行一下meta订阅

        Position position = instance.getMetaDataManager().getPosition(clientId);
        if (position == null) {
            position = instance.getEventStore().getFirstBinLogEventPosition();// 获取一下store中的第一条
            if (position != null) {
                instance.getMetaDataManager().updatePosition(clientId, position); // 更新一下cursor
            }
            log.info("subscribe successfully, {} with first readedIndex:{} ", clientId, position);
        } else {
            log.info("subscribe successfully, use last cursor readedIndex:{} ", clientId, position);
        }

        // 通知下订阅关系变化
        instance.subscribe(clientId);
    }

    /**
     * 取消订阅
     */

    public void unsubscribe(ClientId clientId) throws Exception {
        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        instance.getMetaDataManager().unsubscribe(clientId); // 执行一下meta订阅

        log.info("unsubscribe successfully, {}", clientId);
    }

    /**
     * 查询所有的订阅信息
     */
    public List<ClientId> listAllSubscribe(String destination) throws Exception {
        Instance instance = destinationToInstanceMap.get(destination);
        return instance.getMetaDataManager().listAllSubscribeInfo(destination);
    }

    /**
     * 获取数据
     *
     * <pre>
     * 注意： meta获取和数据的获取需要保证顺序性，优先拿到meta的，一定也会是优先拿到数据，所以需要加同步. (不能出现先拿到meta，拿到第二批数据，这样就会导致数据顺序性出现问题)
     * </pre>
     */

    public Message get(ClientId clientId, int batchSize) throws Exception {
        return get(clientId, batchSize, null, null);
    }

    /**
     * 获取数据，可以指定超时时间.
     *
     * <pre>
     * 几种case:
     * a. 如果timeout为null，则采用tryGet方式，即时获取
     * b. 如果timeout不为null
     *    1. timeout为0，则采用get阻塞方式，获取数据，不设置超时，直到有足够的batchSize数据才返回
     *    2. timeout不为0，则采用get+timeout方式，获取数据，超时还没有batchSize足够的数据，有多少返回多少
     *
     * 注意： meta获取和数据的获取需要保证顺序性，优先拿到meta的，一定也会是优先拿到数据，所以需要加同步. (不能出现先拿到meta，拿到第二批数据，这样就会导致数据顺序性出现问题)
     * </pre>
     */

    public Message get(ClientId clientId, int batchSize, Long timeout, TimeUnit timeUnit)
            throws Exception {
        checkStart(clientId.getDestination());
        checkSubscribe(clientId);
        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        synchronized (instance) {
            // 获取到流式数据中的最后一批获取的位置
            PositionRange<BinLogEventPosition> positionRange = instance.getMetaDataManager().getLastestPositionRange(clientId);

            if (positionRange != null) {
                throw new Exception(
                        String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data",
                                clientId.getClientId(), positionRange));
            }

            Events<Event> events = null;
            Position start = instance.getMetaDataManager().getPosition(clientId);
            events = getEvents(instance.getEventStore(), start, batchSize, timeout, timeUnit);

            if (CollectionUtils.isEmpty(events.getEventList())) {
                log.debug("get successfully, clientId:{} batchSize:{} but result is null", clientId.getClientId(),
                        batchSize);
                return new Message(-1, new ArrayList()); // 返回空包，避免生成batchId，浪费性能
            } else {
                // 记录到流式信息
                Long batchId = instance.getMetaDataManager().addPositionRange(clientId, events.getPositionRange());

                List entryList = null;
                entryList = Lists.transform(events.getEventList(), new Function<Event, Entry>() {

                    public Entry apply(Event event) {
                        return event.getEntry();
                    }
                });
                if (log.isInfoEnabled()) {
                    log.info(
                            "get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , readedIndex:{}]",
                            clientId.getClientId(), batchSize, entryList.size(), batchId, events.getPositionRange());
                }
                // 直接提交ack
                ack(clientId, batchId);
                return new Message(batchId, entryList);
            }
        }
    }

    /**
     * 不指定 readedIndex 获取事件。canal 会记住此 client 最新的 readedIndex。 <br/>
     * 如果是第一次 fetchNextBinlogEvent，则会从 canal 中保存的最老一条数据开始输出。
     *
     * <pre>
     * 注意： meta获取和数据的获取需要保证顺序性，优先拿到meta的，一定也会是优先拿到数据，所以需要加同步. (不能出现先拿到meta，拿到第二批数据，这样就会导致数据顺序性出现问题)
     * </pre>
     */

    public Message getWithoutAck(ClientId clientId, int batchSize) throws Exception {
        return getWithoutAck(clientId, batchSize, null, null);
    }

    /**
     * 不指定 readedIndex 获取事件。canal 会记住此 client 最新的 readedIndex。 <br/>
     * 如果是第一次 fetchNextBinlogEvent，则会从 canal 中保存的最老一条数据开始输出。
     *
     * <pre>
     * 几种case:
     * a. 如果timeout为null，则采用tryGet方式，即时获取
     * b. 如果timeout不为null
     *    1. timeout为0，则采用get阻塞方式，获取数据，不设置超时，直到有足够的batchSize数据才返回
     *    2. timeout不为0，则采用get+timeout方式，获取数据，超时还没有batchSize足够的数据，有多少返回多少
     *
     * 注意： meta获取和数据的获取需要保证顺序性，优先拿到meta的，一定也会是优先拿到数据，所以需要加同步. (不能出现先拿到meta，拿到第二批数据，这样就会导致数据顺序性出现问题)
     * </pre>
     */

    public Message getWithoutAck(ClientId clientId, int batchSize, Long timeout, TimeUnit timeUnit)
            throws Exception {
        checkStart(clientId.getDestination());
        checkSubscribe(clientId);

        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        synchronized (instance) {
            // 获取到流式数据中的最后一批获取的位置
            PositionRange<BinLogEventPosition> positionRange = instance.getMetaDataManager().getLastestPositionRange(clientId);

            Events<Event> events = null;
            if (positionRange != null) { // 存在流数据
                events = getEvents(instance.getEventStore(), positionRange.getStart(), batchSize, timeout, timeUnit);
            } else {// ack后第一次获取
                Position start = instance.getMetaDataManager().getPosition(clientId);
                if (start == null) { // 第一次，还没有过ack记录，则获取当前store中的第一条
                    start = instance.getEventStore().getFirstBinLogEventPosition();
                }

                events = getEvents(instance.getEventStore(), start, batchSize, timeout, timeUnit);
            }

            if (CollectionUtils.isEmpty(events.getEventList())) {
                // log.debug("getWithoutAck successfully, clientId:{}
                // batchSize:{} but result
                // is null",
                // clientId.getClientId(),
                // batchSize);
                return new Message(-1, new ArrayList()); // 返回空包，避免生成batchId，浪费性能
            } else {
                // 记录到流式信息
                Long batchId = instance.getMetaDataManager().addPositionRange(clientId, events.getPositionRange());

                List<Entry> entryList = Lists.transform(events.getEventList(), new Function<Event, Entry>() {

                    public Entry apply(Event input) {
                        return input.getEntry();
                    }
                });
                if (log.isInfoEnabled()) {
                    log.info(
                            "getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , readedIndex:{}]",
                            clientId.getClientId(), batchSize, entryList.size(), batchId, events.getPositionRange());
                }
                return new Message(batchId, entryList);
            }

        }
    }

    /**
     * 查询当前未被ack的batch列表，batchId会按照从小到大进行返回
     */
    public List<Long> listBatchIds(ClientId clientId) throws Exception {
        checkStart(clientId.getDestination());
        checkSubscribe(clientId);

        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        Map<Long, PositionRange> batchs = instance.getMetaDataManager().listAllBatchs(clientId);
        List<Long> result = new ArrayList<Long>(batchs.keySet());
        Collections.sort(result);
        return result;
    }

    /**
     * 进行 batch id 的确认。确认之后，小于等于此 batchId 的 Message 都会被确认。
     *
     * <pre>
     * 注意：进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
     * </pre>
     */

    public void ack(ClientId clientId, long batchId) throws Exception {
        checkStart(clientId.getDestination());
        checkSubscribe(clientId);

        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        PositionRange<BinLogEventPosition> positionRange = null;
        positionRange = instance.getMetaDataManager().removePositionRange(clientId, batchId); // 更新位置
        if (positionRange == null) { // 说明是重复的ack/rollback
            throw new Exception(String.format(
                    "ack error , clientId:%s batchId:%d is not exist , please check", clientId.getClientId(), batchId));
        }

        // 更新cursor最好严格判断下位置是否有跳跃更新
        // Position readedIndex = lastRollbackPostions.get(clientId);
        // if (readedIndex != null) {
        // // Position readedIndex =
        // canalInstance.getMetaDataManager().getPosition(clientId);
        // BinlogEventPosition minPosition =
        // EventUtils.min(positionRanges.getStart(), (BinlogEventPosition)
        // readedIndex);
        // if (minPosition == readedIndex) {// ack的position要晚于该最后ack的位置，可能有丢数据
        // throw new DataPropagateServerException(
        // String.format(
        // "ack error , clientId:%s batchId:%d %s is jump ack , last ack:%s",
        // clientId.getClientId(), batchId, positionRanges,
        // readedIndex));
        // }
        // }

        // 更新cursor
        if (positionRange.getAck() != null) {
            instance.getMetaDataManager().updatePosition(clientId, positionRange.getAck());
            if (log.isInfoEnabled()) {
                log.info("ack successfully, clientId:{} batchId:{} readedIndex:{}", clientId.getClientId(), batchId,
                        positionRange);
            }
        }

        // 可定时清理数据
        instance.getEventStore().ack(positionRange.getEnd());

    }

    /**
     * 回滚到未进行 {@link #ack} 的地方，下次fetch的时候，可以从最后一个没有 {@link #ack} 的地方开始拿
     */

    public void rollback(ClientId clientId) throws Exception {
        checkStart(clientId.getDestination());
        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        // 因为存在第一次链接时自动rollback的情况，所以需要忽略未订阅
        boolean hasSubscribe = instance.getMetaDataManager().hasSubscribe(clientId);
        if (!hasSubscribe) {
            return;
        }

        synchronized (instance) {
            // 清除batch信息,rollback eventStore中的状态信息(all)
            instance.getMetaDataManager().clearAllBatchs(clientId);
            instance.getEventStore().rollback();
            log.info("rollback successfully, clientId:{}", new Object[]{clientId.getClientId()});
        }
    }

    /**
     * 回滚到未进行 {@link #ack} 的地方，下次fetch的时候，可以从最后一个没有 {@link #ack} 的地方开始拿
     */

    public void rollback(ClientId clientId, Long batchId) throws Exception {
        checkStart(clientId.getDestination());
        Instance instance = destinationToInstanceMap.get(clientId.getDestination());

        // 因为存在第一次链接时自动rollback的情况，所以需要忽略未订阅
        boolean hasSubscribe = instance.getMetaDataManager().hasSubscribe(clientId);
        if (!hasSubscribe) {
            return;
        }
        synchronized (instance) {
            // 清除batch信息,rollback eventStore中的状态信息(部分)
            // 清除batch信息
            PositionRange<BinLogEventPosition> positionRange = instance.getMetaDataManager().removePositionRange(clientId, batchId);
            if (positionRange == null) { // 说明是重复的ack/rollback
                throw new Exception(
                        String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
                                clientId.getClientId(), batchId));
            }
            // lastRollbackPostions.put(clientId,
            // positionRanges.getEnd());// 记录一下最后rollback的位置
            // TODO 后续rollback到指定的batchId位置
            instance.getEventStore().rollback();// rollback
            // eventStore中的状态信息
            log.info("rollback successfully, clientId:{} batchId:{} readedIndex:{}", clientId.getClientId(), batchId,
                    positionRange);
        }
    }

    public Map<String, Instance> getDestinationToInstanceMap() {
        return Maps.newHashMap(destinationToInstanceMap);
    }

    /**
     * 根据不同的参数，选择不同的方式获取数据
     */
    private Events<Event> getEvents(EventStore eventStore, Position start, int batchSize, Long timeout, TimeUnit unit) throws Exception {
        if (timeout == null) {
            return eventStore.tryGet(start, batchSize);
        } else {
            try {
                if (timeout <= 0) {
                    return eventStore.get(start, batchSize);
                } else {
                    return eventStore.get(start, batchSize, timeout, unit);
                }
            } catch (Exception e) {
                throw new Exception(e);
            }
        }
    }

    // ======================== helper method =======================

    private void checkSubscribe(ClientId clientId) throws Exception {
        Instance instance = destinationToInstanceMap.get(clientId.getDestination());
        boolean hasSubscribe = instance.getMetaDataManager().hasSubscribe(clientId);
        if (!hasSubscribe) {
            throw new Exception(
                    String.format("ClientId:%s should subscribe first", clientId.toString()));
        }
    }

    private void checkStart(String destination) throws Exception {
        if (!isStart(destination)) {
            throw new Exception(String.format("destination:%s should start first", destination));
        }
    }


    public void setInstanceGenerator(InstanceGenerator instanceGenerator) {
        this.instanceGenerator = instanceGenerator;
    }


    private static class SingletonHolder {

        private static final ServerImpl SERVER = new ServerImpl();
    }

}
